以下參考課程 LLM Twin: Building Your Production-Ready AI Replica 撰寫
在課程中,爬取的資料會同時以 RAG 和 fine-tune 兩種方式來提升 LLM 回應的表現,而無論是哪一個流程,都會涉及資料的處理流程。從原始資料到最終結果,資料會經歷多次狀態轉換,包括清理、分塊以及嵌入等步驟。在這些過程中,使用結構化的模型來管理資料狀態不僅能確保資料的一致性,還能有效地減少錯誤發生的機會。
接下來會針對這一天的課程,將核心概念整理成兩天的篇幅,而今天將深入探討如何運用 Pydantic 模型來管理資料狀態,並展示如何通過 分派層(dispatcher) 動態處理不同狀態的資料,以提高整體流程的可靠性與擴展性。
在資料流處理中,資料會經歷四個主要的狀態:原始資料(Raw)、清理後的資料(Cleaned)、分塊後的資料(Chunked)、以及嵌入後的資料(Embedded)。每個階段的資料具有不同的結構,並且會繼承兩個基礎模型:DataModel 和 DBDataModel。
這些模型設計的核心目的是確保在不同階段中,資料的一致性與結構的清晰性,同時提供儲存該階段資料的方法。
在深入探討四個階段的資料模型之前,先來看兩個基礎模型:
DataModelentry_id 和 type,用於標識每筆資料的唯一 ID 及其類型。from abc import ABC
from pydantic import BaseModel
class DataModel(ABC, BaseModel):
    """
    用於所有資料模型的抽象類。
    """
    entry_id: int
    type: str  # 用來標示資料的類型
DBDataModelDataModel,並且加入了資料保存方法 save() 的抽象定義,適用於那些需要被序列化並保存到資料庫中的資料模型。from abc import ABC, abstractmethod
class DBDataModel(DataModel):
    """
    用於需要被序列化,並保存到向量數據庫中的抽象類。
    """
    @abstractmethod
    def save(self):
        pass
資料在四個主要階段會使用各自的 Pydantic 模型來管理結構和狀態。以下介紹每個階段的模型及其繼承關係。
PostsRawModel原始資料(Raw) 是資料流的初始狀態。來自外部數據源(如 RabbitMQ)並未經過處理。在這個階段,資料還沒有被格式化或清理,僅僅是以原始形態存儲。
class PostsRawModel(DataModel):
    platform: str
    content: dict
    author_id: str
    image: Optional[str] = None
platform:資料來源,如 "Twitter"。content:貼文的具體內容,使用字典來儲存。author_id:貼文的作者 ID。image:圖片 URL,不一定要填寫。這個模型代表了數據流的初始形態,無需進行資料保存。
PostCleanedModel經過清理後,資料轉換為**清理後的資料(Cleaned)**狀態。在這個階段,資料中的無效字符和雜訊被清除,並且格式化為可進行進一步處理的形態。由於這個階段的資料需要保存到資料庫,因此模型繼承了 DBDataModel 並實現了 save() 方法。
class PostCleanedModel(DBDataModel):
    platform: str
    cleaned_content: str
    author_id: str
    image: Optional[str] = None
    def save(self) -> tuple:
        data = {...}
        
        return self.entry_id, data
PostCleanedModel 實現了 save() 方法,將清理後的資料保存至資料庫,並返回 entry_id 及清理後的資料。
PostChunkModel在某些應用中,資料的文本過於龐大,因此需要將其分塊以便於處理,這就是**分塊後的資料(Chunked)**階段。在這裡,我們對資料進行切割,使每個分塊能夠獨立處理。這個模型繼承了 DataModel,不需要實現 save() 方法。
class PostChunkModel(DataModel):
    entry_id: str
    platform: str
    chunk_id: str
    chunk_content: str
    author_id: str
    image: Optional[str] = None
    type: str
chunk_id:每個分塊的唯一 ID。chunk_content:分塊後的具體文本內容。author_id:作者 ID,與原始資料一致。image:圖片 URL,與原始資料一致。這個模型代表了資料被分割為多個塊的狀態,方便後續處理。
PostEmbeddedChunkModel最後一步,**嵌入後的資料(Embedded)**將文本轉換為嵌入向量,這些向量會被用於檢索、分類或其他機器學習任務。由於嵌入後的資料需要保存到向量資料庫,這個模型繼承了 DBDataModel,並實現了 save() 方法。
class PostEmbeddedChunkModel(DBDataModel):
    platform: str
    chunk_id: str
    embedded_content: np.ndarray
    author_id: str
    def save(self) -> tuple:
        data = {...}
        
        return self.chunk_id, self.embedded_content, data
embedded_content:通過嵌入模型生成的數值向量(通常是 Numpy 陣列)。chunk_id:對應於分塊資料中的 chunk_id,用來標識哪個分塊。author_id:與原始資料保持一致。PostEmbeddedChunkModel 實現了 save() 方法,將嵌入後的向量數據保存至資料庫,並返回相關數據。
在資料處理管道中,資料的清理是一個至關重要的環節。為了實現靈活且擴展性強的資料清理邏輯,我們可以設計一個基於抽象類(abstract class) 、 具體處理器(concrete handler) 、 工廠模式(factory pattern) 和 分派層(dispatcher) 的架構。
這一章節將逐步介紹每個組件的角色和其相互之間的協作方式。
CleaningDataHandler:抽象清理處理器類別首先,我們定義一個抽象基礎類 CleaningDataHandler,它作為所有清理邏輯處理器的父類。這個類別包含一個抽象方法 clean(),每個具體的清理處理器都需要實現該方法。
from abc import ABC, abstractmethod
class CleaningDataHandler(ABC):
    """
    Abstract class for all cleaning data handlers.
    """
    @abstractmethod 
    def clean(self, data_model: DataModel) -> DataModel:
        pass
clean():該方法將根據具體資料模型的狀態進行清理,並返回清理後的資料模型。clean() 方法,這樣可以在不同資料類型之間統一清理邏輯的接口。PostCleaningHandler:具體的清理處理器具體的資料類型清理邏輯會由繼承自 CleaningDataHandler 的處理器來實現。在這裡,以 PostCleaningHandler 為例,展示如何清理貼文資料(PostsRawModel)並將其轉換為清理後的資料模型(PostCleanedModel)。
class PostCleaningHandler(CleaningDataHandler):
    def clean(self, data_model: PostsRawModel) -> PostCleanedModel:
        return PostCleanedModel(
            entry_id=data_model.entry_id,
            platform=data_model.platform,
            cleaned_content=clean_text("".join(data_model.content.values())),
            author_id=data_model.author_id,
            image=data_model.image if data_model.image else None,
            type=data_model.type,
        )
clean() 方法:該方法接受 PostsRawModel 類型的資料模型,對其內容進行清理(如移除無效字符),並返回一個新的 PostCleanedModel。cleaned_content,其他屬性如 platform 和 author_id 則保持原樣。CleaningHandlerFactory:處理器工廠為了簡化清理邏輯處理器的選擇過程,我們引入工廠模式。CleaningHandlerFactory 根據資料的 type 屬性,動態生成並返回相應的清理處理器。
class CleaningHandlerFactory:
    @staticmethod
    def create_handler(data_type) -> CleaningDataHandler:
        if data_type == "posts":
            return PostCleaningHandler()
        elif data_type == "articles":
            return ArticleCleaningHandler()
        elif data_type == "repositories":
            return RepositoryCleaningHandler()
create_handler() 方法:根據資料的 type,該方法會返回相應的處理器類別。例如,對於 type = "posts",將返回 PostCleaningHandler;對於 type = "articles",將返回 ArticleCleaningHandler。CleaningDispatcher:清理邏輯分派層為了靈活處理不同類型的資料,我們設計了一個分派層(dispatcher),負責根據資料的 type 動態選擇合適的清理處理器。在這裡,CleaningDispatcher 根據資料類型進行分派,並調用相應的清理邏輯。
class CleaningDispatcher:
    cleaning_factory = CleaningHandlerFactory()
    @classmethod
    def dispatch_cleaner(cls, data_model: DataModel) -> DataModel:
        data_type = data_model.type
        handler = cls.cleaning_factory.create_handler(data_type)
        clean_model = handler.clean(data_model)
        return clean_model
dispatch_cleaner() 方法:根據資料模型的 type 屬性,分派器會選擇合適的處理器,並調用處理器的 clean() 方法進行資料清理。ref.